package com.atguigu.flink.charkoer05;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.been.WaterSensor;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.ArrayList;

public class FlinkUseRedisSink {
    public static void main(String[] args) {
        ArrayList<WaterSensor> waterSensors = new ArrayList<>();
        waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 30));
        waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
        waterSensors.add(new WaterSensor("sensor_1", 1607527994001L, 50));
        waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
        waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        FlinkJedisConfigBase config = new FlinkJedisPoolConfig.Builder().setHost("hadoop162").setPort(6379).build();

        env.fromCollection(waterSensors)
                .keyBy(WaterSensor::getId)
                .sum("vc")
                .addSink(new RedisSink<>(config, new RedisMapper<WaterSensor>() {
                    // 返回redis的命令描述符
                    @Override
                    public RedisCommandDescription getCommandDescription() {
                        return new RedisCommandDescription(RedisCommand.SET, null);
                    }

                    // 返回要写入的key
                    @Override
                    public String getKeyFromData(WaterSensor waterSensor) {
                        return waterSensor.getId();
                    }

                    @Override
                    public String getValueFromData(WaterSensor waterSensor) {
                        return JSON.toJSONString(waterSensor);
                    }
                }));
    }
}
